from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import matplotlib.lines as mlines
import seaborn as sns
sns.set(font_scale=1.3, style='whitegrid', palette='Set2')
from IPython.display import display
import numpy as np
import numpy.linalg as la
import pandas as pd
import scipy.stats as sps
from scipy.spatial import KDTree
# load and show an image with Pillow
from PIL import Image
import copy
import random
import os
from time import time, sleep
from datetime import datetime
import math
from tqdm.notebook import tqdm
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
import cvxpy as cvx
print(cvx.installed_solvers())
from google.colab import drive
drive.mount('/content/gdrive/')
%ls
%cd 'gdrive/MyDrive/mipt-opt/kr2'
%%shell
jupyter nbconvert --to html mipt-opt-kr2.ipynb
import builtins
class Mocking(object):
def __init__(self, new_print):
self.new_print = new_print
self.real_print = builtins.print
def __enter__(self):
builtins.print = self.new_print
return self
def __exit__(self, exc_type, exc_val, exc_tb):
builtins.print = self.real_print
class MockPrint(object):
def __init__(self, real_print=builtins.print):
self.real_print = real_print
self.prev_time = None
def __call__(self, *args, **kwargs):
arg = datetime.now().time() if self.prev_time is None else datetime.now() - self.prev_time
self.real_print(f'[{str(arg):>15}]', *args, **kwargs)
def tik(self):
self.prev_time = datetime.now()
with Mocking(MockPrint()):
print('Hello')
print.tik()
print("Hello")
sleep(0.5)
print("Hello")
print("Hello")
def MockingPrint(mock, tik=True):
def decorator(function):
def wrapper(*args, **kwargs):
with Mocking(mock):
if tik:
print.tik()
return function(*args, **kwargs)
return wrapper
return decorator
%ls demo
cao = Image.open('demo/cao1.jpg')
ksiwek = Image.open('demo/ksiwek1.jpg')
zhang = Image.open('demo/zhang1.jpg')
display(cao, ksiwek, zhang)
def mix_images(images, coef):
return np.dot(np.array(images).T, coef).T
def show_images(images, adjust=False, figsize=(15, 9), shape=None, imshow_params=dict()):
images = np.array(images)
if shape is None:
N = int(np.ceil(np.sqrt(images.shape[0])))
M = int(np.floor(images.shape[0] / N))
else:
N = shape[0]
M = shape[1]
if adjust:
figsize=(figsize[0], figsize[0] * N / M)
fig, axs = plt.subplots(N, M, figsize=figsize)
axs = axs.flatten()
for im, ax in zip(images, axs):
ax.imshow(im, **imshow_params)
ax.axis('off')
plt.subplots_adjust(wspace=0.0, hspace=0.0)
plt.tight_layout(w_pad=0.0, h_pad=0.0)
plt.show()
guys = [
np.asarray(Image.open('demo/cao1.jpg')),
np.asarray(Image.open('demo/ksiwek1.jpg')),
np.asarray(Image.open('demo/zhang1.jpg'))
]
images = []
for _ in range(4):
images = images + guys
alphas = np.random.uniform(size=3)
alphas = alphas / alphas.sum()
images.append(mix_images(guys, alphas))
show_images(images, adjust=True, figsize=(12, 12), shape=(4, 4), imshow_params={'cmap':'gray', 'vmin':0.0, 'vmax':255.0})
(картинки не очень детализированные, поэтому из эстетических соображений с ним работать не будем)
!wget "http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz" -O cifar-10-python.tar.gz
%mv cifar-10-python.tar.gz cifar.tar.gz
%ls
!tar -xvzf "cifar.tar.gz"
def unpickle(file):
import pickle
with open(file, 'rb') as fo:
dict = pickle.load(fo, encoding='bytes')
return dict
cifar_images = unpickle('cifar-10-batches-py/data_batch_1')[b'data']
cifar_images
cifar_images = cifar_images.reshape((10000, 3, 32, 32))
cifar_images.shape
batch = np.swapaxes(np.swapaxes(cifar_images[0:4],1,2), 2, 3)
show_images(batch, adjust=True, figsize=(6, 6), shape=(2, 2))
!wget "http://chaladze.com/l5/img/Linnaeus 5 128X128.rar" -O 'Linnaeus 5 128X128.rar'
%mv 'Linnaeus 5 128X128.rar' Linnaeus.rar
%ls
%ls
!mkdir linna_data
!unrar x Linnaeus.rar ./linna_data
path = 'linna_data'
linna_image_paths = []
for root, dirs, files in os.walk(path):
for file in files:
linna_image_paths.append(os.path.join(root,file))
len(linna_image_paths)
N = 3
M = 4
images = []
for _ in range(M):
new_images = [np.asarray(Image.open(path)) / 255 for path in np.random.choice(linna_image_paths, N, replace=False)]
images = images + new_images
alphas = np.random.uniform(size=N)
alphas = alphas / alphas.sum()
images.append(mix_images(new_images, alphas))
show_images(images, adjust=True, figsize=(12, 12), shape=(M, N + 1))
cvxpy¶Функция, которая понадобится в алгоритме
def solve_LP(r, C, d, ProblemType=cvx.Minimize, solve_params=dict()):
L = C.shape[1]
alpha = cvx.Variable(L)
constraints = [ C @ alpha + d >= 0 ]
obj = ProblemType(r @ (C @ alpha + d))
prob = cvx.Problem(obj, constraints=constraints)
prob.solve(**solve_params)
return prob.value, alpha.value
C = np.array([
[1, 0],
[0, 1],
[-1, -1]
])
d = np.array([-2, -2, 8])
params = {'verbose':False}
r = np.array([0, 0, 1])
p, alpha = solve_LP(r, C, d, solve_params=params)
p, alpha
r = np.array([1, 0, 0])
p, alpha = solve_LP(r, C, d, solve_params=params)
p, alpha
r = np.array([0, 1, 0])
p, alpha = solve_LP(r, C, d, solve_params=params)
p, alpha
r = np.array([1, 1, 0])
p, alpha = solve_LP(r, C, d, ProblemType=cvx.Maximize, solve_params=params)
p, alpha
r = np.array([1, 1, 1])
solve_LP(r, C, d, ProblemType=cvx.Maximize, solve_params=params), solve_LP(r, C, d, ProblemType=cvx.Minimize, solve_params=params) # направление перпендикулярно плоскости
def svd_solve(svd_solver, A, k=None, vectors=True, **kwargs):
import scipy.linalg as sla
import numpy.linalg as la
import scipy.sparse.linalg as spla
if svd_solver == 'numpy':
if vectors != False:
U, S, Vh = la.svd(A, compute_uv=True, **kwargs)
else:
S = la.svd(A, compute_uv=False, **kwargs)
elif svd_solver == 'scipy':
if vectors != False:
U, S, Vh = sla.svd(A, compute_uv=True, **kwargs)
else:
S = sla.svd(A, compute_uv=False, **kwargs)
elif svd_solver == 'scipy.sparse':
if k is None:
raise ValueError("Incorrect k for scipy.sparse.svds")
U, S, Vh = spla.svds(A, k=k, return_singular_vectors=vectors, **kwargs)
else:
raise ValueError("Incorrect solver")
if vectors == 'u':
return U, S
elif vectors == 'vh':
return S, Vh
elif vectors == True:
return U, S, Vh
return S
def solve_LP(r, C, d, ProblemType=cvx.Minimize, solve_params=dict(), verbose=True):
if verbose:
print(f'Searching LP solution')
L = C.shape[1]
alpha = cvx.Variable(L)
constraints = [ C @ alpha + d >= 0 ]
obj = ProblemType(r @ (C @ alpha + d))
prob = cvx.Problem(obj, constraints=constraints)
prob.solve(**solve_params)
if verbose:
print(f'Found LP solution: {alpha.value}, value = {prob.value}')
return prob.value, alpha.value
def is_exp_pt(C, d, alpha, tol=1e-6, verbose=True):
'''
Verify if alpha is an extreme point of the polyhedral set
{ alpha | C@alpha + d >= 0 }
:param (C, d) - the set of polyhedron parameter
:param alpha - the point to be tested
:param tol - specifies the numerical tolerance
:return Is alpha an extreme point
:rtype bool
'''
if verbose:
print(f'Checking if extreme point: {alpha}')
T = C[(np.abs(C@alpha + d) < tol), :]
s = svd_solve('numpy', T, vectors=False)
s = np.abs(s)
temp = s / s.sum()
return np.sum(temp > tol) == C.shape[1]
def get_affine_set(X, N, tol=1e-6, svd_solver='numpy', verbose=True):
'''
Generate an affine set (C, d) from matrix
of observations x
'''
if verbose:
print(f'Generating (C, d) affine set')
index = np.abs(X).sum(axis=1) > tol
Xn = X[index, :]
d = Xn.mean(axis=1)
params = {'which':'LM'} if svd_solver == 'scipy.sparse' else {'full_matrices':True}
C, _ = svd_solve(svd_solver, Xn - d[:, np.newaxis], k=N-1, vectors='u', **params)
C = C[:, :N - 1]
if verbose:
print(f'(C, d) affine set found')
return C, d, index
@MockingPrint(MockPrint())
def CAMNS_LP(X, N, lp_tol=1e-3, ext_tol=1e-6, zeros_tol=1e-6, max_iter=None, svd_solver='numpy', verbose=True):
'''
A practical implementation of the CAMNS-LP method
:param X - the (L, M) observation array, where M is the number of
observations.
:param N - the number of sources.
:param lp_tol - tolerance for numerical errors of LP
:param ext_tol - tolerance for extreme-point validation
:param zeros_tol - tolerance for eliminating zero observation points
:return the L-by-N extracted source matrix, where L is the data length
'''
# ------------Step 0
# -- Get affine set C, d from matrix of observations
L, M = X.shape
C, d, index = get_affine_set(X, N, tol=zeros_tol, svd_solver=svd_solver, verbose=verbose)
LL = index.shape[0]
# --------LP Extreme-Point Finding Algorithm [Table 1]---------------
# ------------Step 1
S = np.empty(shape=(LL, 0))
lp_cnt = 0
iter = 0
# B = np.eye(LL)
Q = np.zeros(shape=(LL, LL))
# --------------Step 7 (repeat until l == N)
while S.shape[1] < N and (max_iter is None or iter < max_iter):
iter += 1
if verbose:
print(f'Starting iter {iter}')
# --------------Step 2
w = sps.norm.rvs(size=LL)
# aka r = B @ w = (I_LL - QQ^T)w
r = w - Q @ (Q.T @ w)
# --------------Step 3
p_star, alpha_1 = solve_LP(r, C, d, ProblemType=cvx.Minimize, verbose=verbose)
q_star, alpha_2 = solve_LP(r, C, d, ProblemType=cvx.Maximize, verbose=verbose)
c_1 = C @ alpha_1 + d
c_2 = C @ alpha_2 + d
# --------------Step 4-5
if (S.shape[1] == 0 or np.abs(p_star) / (la.norm(r) * la.norm(c_1))
>= lp_tol) and is_exp_pt(C, d, alpha_1, tol=ext_tol, verbose=verbose):
S = np.append(S, c_1[:, np.newaxis], axis=1)
if (S.shape[1] == 0 or np.abs(q_star) / (la.norm(r) * la.norm(c_2))
>= lp_tol) and is_exp_pt(C, d, alpha_2, tol=ext_tol, verbose=verbose):
S = np.append(S, c_2[:, np.newaxis], axis=1)
# --------------Step 6
Q, _ = la.qr(S)
if verbose:
print(f'{S.shape[1]} extreme points found')
if S.shape[1] > N: # if actually l = S.shape[1] = N + 1
S = S[:, :N] if p_star > q_star else S[:, np.arange(N + 1) != N - 1]
ans = np.zeros(shape=(L, N))
ans[index, :] = S
return ans
def find_closests(real, result):
tree = KDTree(result.T)
return tree.query(real.T)
def experiment(N, M, L, verbose=True):
real = np.random.uniform(size=(L, N))
A = np.random.uniform(size=(N, M))
A = A / A.sum(axis=0)
observ = real @ A
if verbose:
print("--------------Algorithm output--------------")
result = CAMNS_LP(observ, N, svd_solver='scipy.sparse', verbose=verbose)
if verbose:
print("-------------------End----------------------\n\n")
print('--------Maxs and mins of result vectors:-----')
for j in result.T:
print("({:0.3f}, {:0.3f})".format(np.min(j), np.max(j)), end=" ")
print('\n--------------------------------------------\n\n')
ordinal = lambda n: "%d%s" % (n,"tsnrhtdd"[(n//10%10!=1)*(n%10<4)*n%10::4])
tree = KDTree(result.T)
dists, inds = find_closests(real, result)
for i, (dist, ind) in enumerate(zip(dists, inds)):
print(f"For {ordinal(i)} real vector the closest result is {ordinal(ind)}, distance {dist:.03f}")
experiment(3, 10, 128*128)
experiment(3, 10, 1000)
experiment(3, 10, 100)
В статье упоминается, что алгоритм хорошо работает при $L \gg max(M, N)$
На практике видим, как с уменьшением размера данных точность алгоритма ухудшается.
guys = np.array(guys)
def mix_scientists(M=3, figsize=(12, 12), svd_solver='numpy', show=True, verbose=True):
images = guys
images = np.append(images, np.ones(shape=(M - 3, 128, 128)) * 255, axis=0)
alphas = np.random.uniform(size=(M, 3))
alphas = alphas / alphas.sum(axis=1)[:, np.newaxis]
mixes = np.tensordot(alphas, guys, axes=(1, 0))
print("--------------Algorithm output--------------")
result = CAMNS_LP(mixes.reshape(M, 128*128).T, 3, svd_solver=svd_solver, verbose=verbose).T.reshape(3, 128, 128)
print("-------------------End----------------------\n\n")
dists, inds = find_closests(guys.reshape(3, 128*128).T, result.reshape(3, 128*128).T)
images = np.append(images, mixes, axis=0)
images = np.append(images, result, axis=0)
images = np.append(images, np.ones(shape=(M - 3, 128, 128)) * 255, axis=0)
images = np.append(images, result[inds, :, :], axis=0)
images = np.append(images, np.ones(shape=(M - 3, 128, 128)) * 255, axis=0)
if show:
show_images(images, adjust=True, figsize=figsize, shape=(4, M), imshow_params={'cmap':'gray', 'vmin':0.0, 'vmax':255.0})
mix_scientists(M=3, figsize=(9, 9), svd_solver='numpy', show=False)
mix_scientists(M=3, figsize=(9, 9), svd_solver='scipy', show=False)
mix_scientists(M=3, figsize=(9, 9), svd_solver='scipy.sparse', show=True)
mix_scientists(M=4, figsize=(9, 9), svd_solver='numpy', show=False)
mix_scientists(M=4, figsize=(9, 9), svd_solver='scipy', show=False)
mix_scientists(M=4, figsize=(9, 9), svd_solver='scipy.sparse', show=True)
mix_scientists(M=5, figsize=(9, 9), svd_solver='numpy', show=False)
mix_scientists(M=5, figsize=(9, 9), svd_solver='scipy', show=False)
mix_scientists(M=5, figsize=(9, 9), svd_solver='scipy.sparse', show=True)
В результате видим, что scipy.sparse.linalg.svds в данном случае работает сильно быстрее, будем далее пользоваться им
def mix_random_linnaeus_grey(N=3, M=3, figsize=(12, 12), svd_solver='numpy', show=True, verbose=True):
real = np.array([np.asarray(Image.open(path).convert('L')) for path in np.random.choice(linna_image_paths, N, replace=False)])
images = np.append(real, np.ones(shape=(M - N, 128, 128))*255, axis=0)
alphas = np.random.uniform(size=(M, N))
alphas = alphas / alphas.sum(axis=1)[:, np.newaxis]
mixes = np.tensordot(alphas, real, axes=(1, 0))
if verbose:
print("--------------Algorithm output--------------")
result = CAMNS_LP(mixes.reshape(M, 128*128).T, N, svd_solver=svd_solver, verbose=verbose).T.reshape(N, 128, 128)
if verbose:
print("-------------------End----------------------\n\n")
dists, inds = find_closests(real.reshape(N, 128*128).T, result.reshape(N, 128*128).T)
images = np.append(images, mixes, axis=0)
images = np.append(images, result, axis=0)
images = np.append(images, np.ones(shape=(M - N, 128, 128))*255, axis=0)
images = np.append(images, result[inds, :, :], axis=0)
images = np.append(images, np.ones(shape=(M - N, 128, 128))*255, axis=0)
if show:
show_images(images, adjust=True, figsize=figsize, shape=(4, M), imshow_params={'cmap':'gray', 'vmin':0.0, 'vmax':255.0})
mix_random_linnaeus_grey(N=3, M=3, figsize=(9, 9), svd_solver='scipy.sparse')
mix_random_linnaeus_grey(N=4, M=5, figsize=(12, 12), svd_solver='scipy.sparse')
mix_random_linnaeus_grey(N=5, M=7, figsize=(12, 12), svd_solver='scipy.sparse')
mix_random_linnaeus_grey(N=10, M=10, figsize=(12, 12), svd_solver='scipy.sparse', verbose=False)
Видим, что при M равным или лишь чуть больше N, алгоритм отрабатывает не очень. Посмотрим, что будет, если увеличить значение M
mix_random_linnaeus_grey(N=4, M=7, figsize=(12, 20), svd_solver='scipy.sparse', verbose=False)
mix_random_linnaeus_grey(N=5, M=10, figsize=(15, 25), svd_solver='scipy.sparse', verbose=False)
mix_random_linnaeus_grey(N=8, M=16, figsize=(20, 25), svd_solver='scipy.sparse', verbose=False)
PS. Почему-то ноутбук бесится (выжирает память и довольный умирает), если использовать все изображения целиком, поэтому приходится извращаться и уменьшать размер изображения для нормальной работы
Судя по всему, это происходит где-то на svd, ибо там могут получаться большие матрицы при поиска декомпозиции
def mix_random_linnaeus_color(N=3, M=3, figsize=(12, 12), im_shape=(96, 96), svd_solver='numpy', verbose=True, show=True):
imc_shape = (*im_shape, 3)
flat_shape = np.prod(imc_shape)
real = np.array([np.asarray(Image.open(path).resize(im_shape)) / 255 for path in np.random.choice(linna_image_paths, N, replace=False)])
images = np.append(real, np.ones(shape=(M - N, *imc_shape)), axis=0)
alphas = np.random.uniform(size=(M, N))
alphas = alphas / alphas.sum(axis=1)[:, np.newaxis]
mixes = np.tensordot(alphas, real, axes=(1, 0))
if verbose:
print("--------------Algorithm output--------------")
result = CAMNS_LP(mixes.reshape(M, flat_shape).T, N, svd_solver=svd_solver, verbose=verbose).T.reshape(N, *imc_shape)
if verbose:
print("-------------------End----------------------\n\n")
result = np.clip(result, 0, 1)
dists, inds = find_closests(real.reshape(N, flat_shape).T, result.reshape(N, flat_shape).T)
images = np.append(images, mixes, axis=0)
images = np.append(images, result, axis=0)
images = np.append(images, np.ones(shape=(M - N, *imc_shape)), axis=0)
images = np.append(images, result[inds, :, :], axis=0)
images = np.append(images, np.ones(shape=(M - N, *imc_shape)), axis=0)
if show:
show_images(images, adjust=True, figsize=figsize, shape=(4, M))
mix_random_linnaeus_color(
N=3,
M=3,
figsize=(9, 9),
im_shape=(96, 96),
svd_solver='scipy.sparse',
verbose=False
)
mix_random_linnaeus_color(
N=4,
M=4,
figsize=(12, 12),
im_shape=(96, 96),
svd_solver='scipy.sparse',
verbose=False
)
mix_random_linnaeus_color(
N=4,
M=7,
figsize=(15, 20),
im_shape=(96, 96),
svd_solver='scipy.sparse',
verbose=False
)
P.S. С увеличением N и M все чаще случаются ситуации с плохими множествами C, d, что возможно случается из-за частых проблем с зависимостями смесей данных
start = datetime.now()
mix_random_linnaeus_color(
N=20,
M=40,
figsize=(15, 20),
im_shape=(64, 64),
svd_solver='numpy',
verbose=False,
show=False
)
end = datetime.now()
print(f"Finished in {end - start}")
start = datetime.now()
mix_random_linnaeus_color(
N=20,
M=40,
figsize=(15, 20),
im_shape=(64, 64),
svd_solver='scipy',
verbose=False,
show=False
)
end = datetime.now()
print(f"Finished in {end - start}")
start = datetime.now()
mix_random_linnaeus_color(
N=20,
M=40,
figsize=(15, 20),
im_shape=(64, 64),
svd_solver='scipy.sparse',
verbose=False,
show=False
)
end = datetime.now()
print(f"Finished in {end - start}")
Алгоритм в целом очень классно работает, не используя никаких нейронок, сложных ML моделей и т.п., а только решая некоторую не очень сложную задачу оптимизации
При этом, несмотря на утверждения выше, он не всегда справляется с вытаскиванием всех исходных картинок корректно. Происходит это скорее всего потому, что
Может не выполняться A2 (про local dominant) так как картинки произвольные, чаще всего, везде имеют какой-то ненулевой цвет
Может не выполняться A4, так как смеси генерируются рандомно, как следствие - могут случаться линейно зависимые смеси и ранг матрицы наблюдений будет ниже N. От этого спасает увеличение числа смесей картинок. Однако и тогда могут случаться ситуации, когда из-за зависимостей в данных поиск максимума или минимума ломается
Сответственно про такие случаи нельзя сказать, может и должен ли алгоритм работать корректно. При этом, с увеличением числа наблюдений алгоритм начинает работать лучше и давать лучшие показатели, т.к. информации для обработки становится больше, что с больошой вероятностью помогает справиться с A4 и возможно как-то улучшает ситуацию в A2
Кроме того, с временем работы помогает разобраться использование scipy.sparse.linalg.svds вместо scipy.linalg.svd и numpy.linalg.svd, работающее несколько быстрее.
(результат 2 - получено много забавных фотоколлажей с животными в стиле "Вьетнамские флешбеки")